{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create entry points to spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "sc = SparkContext(master = 'local')\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder \\\n",
    "          .appName(\"Python Spark SQL basic example\") \\\n",
    "          .config(\"spark.some.config.option\", \"some-value\") \\\n",
    "          .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Linear regression without cross-valiation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----+---------+-----+\n",
      "|   TV|Radio|Newspaper|Sales|\n",
      "+-----+-----+---------+-----+\n",
      "|230.1| 37.8|     69.2| 22.1|\n",
      "| 44.5| 39.3|     45.1| 10.4|\n",
      "| 17.2| 45.9|     69.3|  9.3|\n",
      "|151.5| 41.3|     58.5| 18.5|\n",
      "|180.8| 10.8|     58.4| 12.9|\n",
      "+-----+-----+---------+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "ad = spark.read.csv('data/Advertising.csv', header=True, inferSchema=True)\n",
    "ad.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Transform data structure"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+-----+\n",
      "|         features|label|\n",
      "+-----------------+-----+\n",
      "|[230.1,37.8,69.2]| 22.1|\n",
      "| [44.5,39.3,45.1]| 10.4|\n",
      "| [17.2,45.9,69.3]|  9.3|\n",
      "|[151.5,41.3,58.5]| 18.5|\n",
      "|[180.8,10.8,58.4]| 12.9|\n",
      "+-----------------+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.linalg import Vectors\n",
    "ad_df = ad.rdd.map(lambda x: [Vectors.dense(x[0:3]), x[-1]]).toDF(['features', 'label'])\n",
    "ad_df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build linear regression model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.regression import LinearRegression\n",
    "lr = LinearRegression(featuresCol = 'features', labelCol = 'label')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Fit the model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "lr_model = lr.fit(ad_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prediction"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+-----+------------------+\n",
      "|         features|label|        prediction|\n",
      "+-----------------+-----+------------------+\n",
      "|[230.1,37.8,69.2]| 22.1| 20.52397440971517|\n",
      "| [44.5,39.3,45.1]| 10.4|12.337854820894362|\n",
      "| [17.2,45.9,69.3]|  9.3|12.307670779994238|\n",
      "|[151.5,41.3,58.5]| 18.5| 17.59782951168913|\n",
      "|[180.8,10.8,58.4]| 12.9|13.188671856831299|\n",
      "+-----------------+-----+------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pred = lr_model.transform(ad_df)\n",
    "pred.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Module evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.897210638178952"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.ml.evaluation import RegressionEvaluator \n",
    "evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')\n",
    "evaluator.setMetricName('r2').evaluate(pred)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Compare results with R\n",
    "The comparison below shows that the linear regression analyses from pyspark and R obtained very close results."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```{r}\n",
    "# intercept and coefficients from R\n",
    "advertise = read.csv('data/Advertising.csv', header = TRUE)\n",
    "lr_ad = lm(Sales~., data = advertise)\n",
    "lr_ad$coefficients\n",
    "\n",
    " (Intercept)           TV        Radio    Newspaper \n",
    " 2.938889369  0.045764645  0.188530017 -0.001037493\n",
    " \n",
    "# intercept and coefficents from pyspark\n",
    "lr_model.intercept\n",
    "\n",
    "2.9388893694594134\n",
    "\n",
    "lr_model.coefficients\n",
    "\n",
    "DenseVector([0.0458, 0.1885, -0.001])\n",
    "\n",
    "# R squared from R\n",
    "summary(lr_ad)$r.squared\n",
    "\n",
    "0.8972106\n",
    "\n",
    "# R squared from pyspark\n",
    "evaluator.evaluate(ad_pred, {evaluator.metricName: \"r2\"})\n",
    "\n",
    "0.897210638178952\n",
    "\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Linear regression with cross-validation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Training and test datasets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "training, test = ad_df.randomSplit([0.8, 0.2], seed=123)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build cross-validation model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "##=====build cross valiation model======\n",
    "\n",
    "# estimator\n",
    "lr = LinearRegression(featuresCol = 'features', labelCol = 'label')\n",
    "\n",
    "# parameter grid\n",
    "from pyspark.ml.tuning import ParamGridBuilder\n",
    "param_grid = ParamGridBuilder().\\\n",
    "    addGrid(lr.regParam, [0, 0.5, 1]).\\\n",
    "    addGrid(lr.elasticNetParam, [0, 0.5, 1]).\\\n",
    "    build()\n",
    "    \n",
    "# evaluator\n",
    "evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')\n",
    "\n",
    "# cross-validation model\n",
    "from pyspark.ml.tuning import CrossValidator\n",
    "cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Fit cross-validation model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cv_model = cv.fit(training)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prediction"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "pred_training_cv = cv_model.transform(training)\n",
    "pred_test_cv = cv_model.transform(test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.8982486958337326"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# performance on training data\n",
    "evaluator.setMetricName('r2').evaluate(pred_training_cv)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.8896562076565583"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# performance on test data\n",
    "evaluator.setMetricName('r2').evaluate(pred_test_cv)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Intercept and coefficients"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Intercept:  3.075068686285647 \n",
      " coefficients:  [0.0465074974309,0.180854522465,-0.00107520549074]\n"
     ]
    }
   ],
   "source": [
    "print('Intercept: ', cv_model.bestModel.intercept, \"\\n\",\n",
    "     'coefficients: ', cv_model.bestModel.coefficients)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get parameter values from the best model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Parameters can be extracted by calling the java property."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "best regParam: 0.0\n",
      "best ElasticNetParam:0.0\n"
     ]
    }
   ],
   "source": [
    "print('best regParam: ' + str(cv_model.bestModel._java_obj.getRegParam()) + \"\\n\" +\n",
    "     'best ElasticNetParam:' + str(cv_model.bestModel._java_obj.getElasticNetParam()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
